鐵人賽已逐漸進入尾聲,前面二十多天,我們一步步擴充加強 Ktor 功能,也整合了 ORM, Redis 許多框架函式庫,整個系統架構及基礎設施功能已逐漸成形。今天我想儘可能把先前累積實作的功能都串連起來,以此為基礎開發 Multi-Channel Notifications 功能來當作範例。
今天會應用先前已實作的功能及概念,讀者可參考以前的文章
NotificationType
實作如何產生訊息內容及指定收件者即可,其餘底層寄送訊息的程式碼都是共用的Notification Plugin 會根據以下的設定,初始化 email, push, sms 3個 NotificationChannelSender
,每個 NotificationChannelSender
都有各自的細部設定可以調整。本機開發測試時,可以設定 mock = true 使用 MockNotificationChannelSender 只做寫入 log 的動作就好。
notification {
channels {
email {
mock = false
noReplyAddress = "no-reply@fanpoll.club"
#marketingAddress = ""
awsSES {
nettyHttpClient {
http {
#maxConcurrency = 50 # => aws default value = 50
#maxPendingConnectionAcquires = 10000 => aws default value = 10000
#maxIdleConnectionTimeout = 60s => aws default value = 60s
}
threadPool {
fixedPoolSize = 3
}
}
}
}
push {
mock = false
fcm {
# https://github.com/firebase/firebase-admin-java/issues/317
# https://github.com/googleapis/google-auth-library-java/issues/356
# By default failing requests are retried up to 10 times with exponential backoff.
# connectTimeout = 60000
readTimeout = 180000
threadPool {
fixedPoolSize = 3
}
}
}
sms {
mock = true
}
}
// 其餘省略
}
目前 NotificationChannelSender 包含以下實作子類別
maxReceiversPerRequest
=> 可以控制每次發送到第三方服務的訊息數量上限shutdown()
=> 當 Server 停止時會呼叫此方法關閉與第三方服務的連線物件或 thead pool,例如呼叫 AWS SesAsyncClient 的 close()
方法
interface NotificationChannelSender {
fun send(message: NotificationMessage)
val maxReceiversPerRequest: Int
fun shutdown() {}
}
class AwsSESSender(
config: AwsSESConfig,
private val loggingConfig: NotificationLogConfig,
private val logWriter: LogWriter
) : NotificationChannelSender
class FCMSender(
config: FCMConfig,
private val pushTokenStorage: PushTokenStorage,
private val loggingConfig: NotificationLogConfig,
private val logWriter: LogWriter
) : NotificationChannelSender
class MitakeSender(
private val config: MitakeConfig,
private val loggingConfig: NotificationLogConfig,
private val logWriter: LogWriter
) : NotificationChannelSender
class MockNotificationChannelSender(
private val loggingConfig: NotificationLogConfig,
private val logWriter: LogWriter
) : NotificationChannelSender
NotificationChannelSender 寄送訊息時會呼叫 send(message: NotificationMessage)
方法,NotificationMessage
除了訊息內容、收件者之外,還包含了許多額外資訊,方便未來查詢。
所以這3個 id 的階層關係由上到下的順序是 eventId → notificationId → notificationMessageId,我們可以查詢某個事件寄送的所有 Notification 及 NotificationMessage
data class NotificationMessage(
val notificationId: UUID,
val eventId: UUID,
val type: NotificationType,
val version: String? = null,
val channel: NotificationChannel,
val lang: Lang,
val sender: String? = null,
val receivers: List<String>,
val content: NotificationChannelContent,
var sendAt: Instant? = null
) : IdentifiableObject<UUID>() {
override val id: UUID = UUID.randomUUID()
fun toNotificationMessageLog(): NotificationMessageLog = NotificationMessageLog(
id, notificationId, eventId,
type, version, channel, lang, receivers
)
}
NotificationChannelSender 寄送訊息時,也會使用 LogWriter 記錄 log,包含第三方服務的回應碼及訊息…等,方便未來追蹤問題除錯。更多實作細節可參考以前的文章 [Day 20] 實作 Logging Plugin 建立系統 Logging 機制
notification {
logging {
enabled = true
destination = "AwsKinesis" # File(default), Database, AwsKinesis
logSuccess = false
logSuccessReqBody = false
logSuccessRspBody = false
}
// 其餘省略
}
@Serializable
data class NotificationMessageLog(
@Serializable(with = UUIDSerializer::class) override val id: UUID,
@Serializable(with = UUIDSerializer::class) val notificationId: UUID,
@Serializable(with = UUIDSerializer::class) val eventId: UUID,
val type: NotificationType,
val version: String? = null,
val channel: NotificationChannel,
val lang: Lang,
val receivers: List<String>,
@Serializable(with = InstantSerializer::class) var sendAt: Instant? = null,
// result
var successList: JsonArray? = null,
var failureList: JsonArray? = null,
var invalidRecipientIds: List<String>? = null,
// response detail
var rspCode: String? = null,
var rspMsg: String? = null,
@Serializable(with = InstantSerializer::class) var rspAt: Instant? = null,
var rspTime: Long? = null,
var rspBody: String? = null
) : LogMessage() {
@Serializable(with = InstantSerializer::class)
override val occurAt: Instant = Instant.now()
var content: String? = null
var success: Boolean = true
var errorMsg: String? = null
override val logLevel: LogLevel = if (success) LogLevel.INFO else LogLevel.ERROR
override val logType: String = LOG_TYPE
companion object {
const val LOG_TYPE = "notification_message"
}
}
實作完底層 NotificationChannelSender 串接第三方服務後,接下來是實作上層負責寄送通知的 NotificationSender,我希望寄送通知的程式寫法應該要非常簡單,當某個功能需要寄送通知時,只要先透過 Koin DI 取得 NotificationSender 物件,再呼叫 send(notification: Notification)
方法即可。
val notificationSender by inject<NotificationSender>()
notificationSender.send(notification)
interface NotificationSender {
fun send(notification: Notification)
fun shutdown() {}
}
當我們使用 NotificationSender 發送通知時,需要先建立 Notification 物件,填入訊息內容 NotificationContent 及收件者 Recipient…等資料。
@Serializable
data class Notification(
val type: NotificationType,
val recipients: MutableSet<Recipient> = mutableSetOf(),
val content: NotificationContent = NotificationContent(),
val contentArgs: MutableMap<String, String> = mutableMapOf(),
@Transient val templateArgs: MutableMap<String, Any> = mutableMapOf(), // templateArgs doesn't support i18n now
@Transient val lazyLoadArg: Any? = null,
val remote: Boolean = false,
val remoteArg: JsonObject? = null,
@Serializable(with = UUIDSerializer::class) override val id: UUID = UUID.randomUUID(),
@Serializable(with = UUIDSerializer::class) val eventId: UUID = UUID.randomUUID(),
@Serializable(with = InstantSerializer::class) val createAt: Instant = Instant.now(),
var version: String? = null
) : IdentifiableObject<UUID>() {
@Serializable(with = InstantSerializer::class)
var sendAt: Instant? = null
@Transient
val lazyLoad = type.isLazy()
fun load() = type.lazyLoad(this)
}
@Serializable
data class Recipient(
override val id: String,
val userType: UserType? = null,
@Serializable(with = UUIDSerializer::class) val userId: UUID? = null,
// val channels: Set<NotificationChannel>? = null, TODO => user notification preferences
val name: String? = null,
var lang: Lang? = null,
val email: String? = null,
val mobile: String? = null,
val pushTokens: Set<String>? = null
) : IdentifiableObject<String>()
@Serializable
data class NotificationContent(
val email: MutableMap<Lang, EmailContent> = mutableMapOf(),
val push: MutableMap<Lang, PushContent> = mutableMapOf(),
val sms: MutableMap<Lang, SMSContent> = mutableMapOf()
)
@Serializable
class EmailContent(
var subject: String? = null,
var body: String? = null,
@Transient val attachments: List<Attachment>? = null
)
@Serializable
class PushContent(
val title: String,
val body: String,
val data: Map<String, String>? = null
)
@Serializable
class SMSContent(
val body: String
)
findRecipients(userFilters: Map<UserType, String>?)
方法@Serializable
open class NotificationType(
val projectId: String,
val name: String,
val channels: Set<NotificationChannel>,
val category: NotificationCategory,
// val priority: NotificationPriority TODO => priority queues
val version: String? = null,
val lang: Lang? = null,
@Transient @OpenApiIgnore private val lazyLoadBlock: (NotificationType.(Notification) -> Unit)? = null
) : IdentifiableObject<String>() {
override val id: String = "${projectId}_${name}"
fun isLazy(): Boolean = lazyLoadBlock != null
fun lazyLoad(notification: Notification) {
requireNotNull(lazyLoadBlock)
lazyLoadBlock.invoke(this, notification)
}
open fun findRecipients(userFilters: Map<UserType, String>?): Set<Recipient> =
error("NotificationType $id findRecipients is not yet implemented")
}
enum class NotificationChannel {
Email, Push, SMS
}
enum class NotificationCategory {
System, Marketing
}
enum class NotificationPriority {
URGENT, HIGH, LOW
}
class NotificationDispatcher(
private val config: NotificationChannelConfig,
private val envMode: EnvMode,
private val availableLangs: AvailableLangs,
private val i18nNotificationProjectMessages: I18nNotificationProjectMessages,
private val emailSender: NotificationChannelSender? = null,
private val pushSender: NotificationChannelSender? = null,
private val smsSender: NotificationChannelSender? = null
) : NotificationSender
NotificationDispatcher 也實作 NotificationSender 介面,當呼叫 send(notification: Notification)
方法時,會根據 Notification 的 NotificationType 所定義的 channels: Set<NotificationChannel>
,呼叫對應 NotificationChannelSender 的 send(message: NotificationMessage)
方法
這裡使用 decorator pattern 的手法,讓 NotificationDispatcher 及 NotificationCoroutineActor 都實作 NotificationSender 介面,所以呼叫 send(notification: Notification) 方法時,會 delegate 到內部的 NotificationSender,執行順序會是 NotificationCoroutineActor → NotificationDispatcher → NotificationChannelSender。更多 NotificationCoroutineActor 的實作細節,請參考 [Day 21] 使用 Coroutine SendChannel 處理非同步工作
class NotificationCoroutineActor(
coroutineActorConfig: CoroutineActorConfig,
private val notificationSender: NotificationSender,
private val logWriter: LogWriter
) : NotificationSender {
private val logger = KotlinLogging.logger {}
private val actorName = "NotificationActor"
private val actor: CoroutineActor<Notification> = CoroutineActor(
actorName, Channel.UNLIMITED,
coroutineActorConfig, Dispatchers.IO,
this::execute, null,
logWriter
)
override fun send(notification: Notification) {
actor.sendToUnlimitedChannel(notification, InfraResponseCode.NOTIFICATION_ERROR) // non-blocking by Channel.UNLIMITED
}
private fun execute(notification: Notification) {
try {
notificationSender.send(notification)
} catch (e: Throwable) {
val errorMsg = "$actorName execute error"
logger.error("errorMsg => $notification", e)
logWriter.write(
ErrorLog.internal(
InternalServerException(
InfraResponseCode.NOTIFICATION_ERROR, errorMsg, e,
mapOf("id" to notification.id, "type" to notification.type, "eventId" to notification.eventId)
),
actorName, notification.id.toString()
)
)
}
}
override fun shutdown() {
notificationSender.shutdown()
actor.close()
}
}
最後實作 Notification Plugin 初始化 NotificationDispatcher、NotificationCoroutineActor 及所有 NotificationChannelSender,並且都註冊至 Koin DI。當某個功能需要寄送通知時,只要先透過 Koin DI 取得 NotificationSender 物件,再呼叫 send(notification: Notification) 方法即可。
install(NotificationFeature)
override fun install(pipeline: Application, configure: Configuration.() -> Unit): NotificationFeature {
pipeline.koin {
modules(
module(createdAtStart = true) {
val notificationDispatcher = NotificationDispatcher(
channelConfig,
envMode,
availableLangs,
i18nNotificationProjectMessagesProviders,
emailSender,
pushSender,
smsSender
)
val notificationSender = config.asyncExecutor?.let {
NotificationCoroutineActor(it.coroutineActor, notificationDispatcher, logWriter)
} ?: notificationDispatcher
single { notificationSender }
KoinApplicationShutdownManager.register { notificationSender.shutdown() }
// 其餘省略
}
)
}
}
為了支援多國語言的訊息內容,我實作了 I18nNotificationProjectMessages 從語系檔 notification_${lang}.properties 讀取訊息文字,例如下面是 ops 子專案的 dataReport
NotificationType 的 Email 主旨,詳細的實作細節可參考之前的文章 [Day 11] 實作 Ktor i18n 機制
# format => ${type}.${channel}.${part}=""
ops_dataReport.Email.subject=[維運] 資料查詢報表: ${dataType} ${queryTime}
至於 Email 的 body 內容就需要準備每個語言的 html 樣板檔案
<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
</head>
<body>
<div id="content">
<ul>
<li>
查詢時間: ${queryTime}
</li>
<li>
查詢資料類型: ${dataType}
</li>
<li>
查詢條件: ${query}
</li>
</ul>
</div>
</body>
</html>
然後實作 NotificationTemplateProcessor 使用 FreeMarker 載入 html 樣板檔案,並替換裡面的變數值
class NotificationTemplateProcessor(
private val availableLangs: AvailableLangs
) {
private val cfg: Configuration = Configuration(Configuration.VERSION_2_3_31).apply {
templateLoader = ClassTemplateLoader(NotificationTemplateProcessor::class.java, "/i18n/notification/templates")
templateExceptionHandler = TemplateExceptionHandler.RETHROW_HANDLER
logTemplateExceptions = false
wrapUncheckedExceptions = true
recognizeStandardFileExtensions = false
defaultEncoding = "UTF-8"
outputFormat = HTMLOutputFormat.INSTANCE
locale = availableLangs.first().locale
timeZone = TimeZone.getTimeZone(ZoneId.of("UTC"))
dateFormat = "yyyy-MM-dd"
dateTimeFormat = "yyyy-MM-dd HH:mm:ss"
}
fun processEmail(type: NotificationType, args: Map<String, Any>, lang: Lang): String =
process(type.id, args, lang, "html")
private fun process(templateName: String, args: Map<String, Any>, lang: Lang, ext: String): String {
val templateFileName = buildTemplateFileName(templateName, lang, ext)
val template: Template = try {
cfg.getTemplate(templateFileName, null, null, null, true, true)
?: cfg.getTemplate(buildTemplateFileName(templateName, availableLangs.first(), ext))
} catch (e: Throwable) {
throw InternalServerException(InfraResponseCode.DEV_ERROR, "template file $templateFileName parsing error or not found", e)
}
return try {
val outputWriter = StringWriter()
template.process(args, outputWriter)
outputWriter.toString()
} catch (e: Throwable) {
throw InternalServerException(InfraResponseCode.DEV_ERROR, "process template error: template = $templateName, args = $args", e)
}
}
private fun buildTemplateFileName(templateName: String, lang: Lang, ext: String) = "${templateName}_${lang.code}.$ext"
}
實作的目標是 => 管理者可以填寫訊息文字,並撰寫 QueryDSL 只傳送訊息給符合查詢條件的使用者。以下面的 request body json 為例,系統只會寄送中文測試通知給符合查詢條件使用者 (1980年後出生的男性,而且為啟用狀態的使用者)
{
"userFilters": {
"club_user": "[enabled = true and gender = Male and birthYear >= 1980]"
},
"content": {
"email": {
"zh-TW": {
"subject": "測試Email",
"body": "我是Email內容"
}
},
"push": {
"zh-TW": {
"title": "測試推播",
"body": "我是推播內容"
}
}
}
}
首先建立 POST /club/users/sendNotification
API,只有 ClubAuth.Admin 角色的使用者可以呼叫。接下來是透過 Koin DI 取得 NotificationSender,然後建立 NotificationType 為 SendNotification 的 Notification 物件,最後再呼叫 send() 方法即可發送通知。
由於是非同步發送通知,所以 API 會回傳 notification 的 id,管理者可以稍後使用此 id 去 Kibana 搜尋 log 查詢寄送結果
fun Routing.clubUser() {
val notificationSender by inject<NotificationSender>()
route("${ClubConst.urlRootPath}/users") {
authorize(ClubAuth.Admin) {
post<SendNotificationForm, UUID>("/sendNotification", ClubOpenApi.SendNotification) { form ->
val notification = form.toNotification(ClubNotification.SendNotification)
notificationSender.send(notification)
call.respond(DataResponseDTO.uuid(notification.id))
}
}
}
}
@Serializable
data class SendNotificationForm(
val recipients: MutableSet<Recipient>? = null, //直接填入 Recipient 資料
val userFilters: Map<UserType, String>? = null, // 或是撰寫 QueryDSL 查詢使用者
val content: NotificationContent, // 直接填寫通知訊息內容
val contentArgs: MutableMap<String, String>? = null
): Form<SendNotificationForm>() {
fun toNotification(type: NotificationType): Notification {
content.email.values.forEach {
it.body = buildEmailHtmlBody(it.body!!)
}
val recipients = recipients ?: type.findRecipients(userFilters)
if (recipients.isEmpty()) {
throw RequestException(InfraResponseCode.QUERY_RESULT_EMPTY, "recipients is empty")
}
return Notification(
type, recipients = recipients.toMutableSet(),
content, contentArgs = contentArgs ?: mutableMapOf(),
remote = false
)
}
// 其餘省略
}
我在 ClubNotification.kt 檔案定義了 Club 子專案所有的 NotificationType。每個 NotificationType 都是子類別 ClubNotificationType
物件,ClubNotificationType
已實作 findRecipients(userFilters: Map<UserType, String>?)
方法,可以從資料庫載入 Club 子專案中符合 QueryDSL 查詢條件的使用者資料,包括 email, lang…等欄位,建立 Recipient 物件。
object ClubNotification {
val SendNotification = ClubNotificationType(
"sendNotification",
setOf(NotificationChannel.Push, NotificationChannel.Email), NotificationCategory.System
)
}
class ClubNotificationType(
name: String,
channels: Set<NotificationChannel>,
category: NotificationCategory,
@Transient @OpenApiIgnore private val lazyLoadBlock: (NotificationType.(Notification) -> Unit)? = null
) : NotificationType(ClubConst.projectId, name, channels, category, null, null, lazyLoadBlock) {
override fun findRecipients(userFilters: Map<UserType, String>?): Set<Recipient> {
val userFilter = userFilters?.get(ClubUserType.User.value)?.let {
if (it.isBlank()) null
else DynamicDBQuery.convertPredicate(DynamicQuery.parseFilter(it), UserDTO.mapper)
}
return transaction {
val query = ClubUserTable.join(UserDeviceTable, JoinType.LEFT, ClubUserTable.id, UserDeviceTable.userId) {
UserDeviceTable.enabled eq true
}.slice(
ClubUserTable.id, ClubUserTable.account, ClubUserTable.name,
ClubUserTable.email, ClubUserTable.mobile, ClubUserTable.lang,
UserDeviceTable.id, UserDeviceTable.pushToken
).select { ClubUserTable.enabled eq true }
userFilter?.let { query.adjustWhere { it } }
query.toList().toDTO(UserDTO::class).map { user ->
with(user) {
Recipient(
account!!, ClubUserType.User.value, id, name, lang, email, mobile,
devices?.mapNotNull { it.pushToken }?.toSet()
)
}
}.toSet()
}
}
}
實作的目標是 => 管理者可以撰寫 QueryDSL 查詢 User 資料表,把資料匯出成 Excel 檔案,寄送至指定 email。以下面的 request body json 為例,系統會查詢「角色為 AppTeam 而且為啟用狀態」的使用者資料 account, name 欄位,匯出成 Excel 檔案,寄送 email 至 admin@test.abc.com
{
"dataType": "OpsUser",
"email": "admin@test.abc.com",
"query": "q_fields=account,name&q_filter=[role = AppTeam and enabled = true]&q_orderBy=createdAt"
}
查詢 SQL => SELECT ops_user.id, ops_user.account, ops_user."name" FROM ops_user WHERE (ops_user."role" = 'AppTeam') AND (ops_user.enabled = true) ORDER BY ops_user.created_at ASC
首先建立 POST /ops/data/report API,只有 OpsAuth.Admin 角色的使用者可以呼叫。接下來是透過 Koin DI 取得 NotificationSender,然後建立 NotificationType 為 DataReport 的 Notification 物件,最後再呼叫 send() 方法即可發送通知。
由於是非同步發送通知,所以 API 會回傳 notification 的 id,管理者可以稍後使用此 id 去 Kibana 搜尋 log 查詢寄送結果
fun Routing.opsDataReport() {
val notificationSender by inject<NotificationSender>()
route("${OpsConst.urlRootPath}/data/report") {
authorize(OpsAuth.OpsTeam) {
post<DataReportForm, UUID>(OpsOpenApi.DataReport) { form ->
val notification = Notification(OpsNotification.DataReport, lazyLoadArg = form)
notificationSender.send(notification)
call.respond(DataResponseDTO.uuid(notification.id))
}
}
}
}
@Serializable
class DataReportForm(
val dataType: DataReportDataType,
val query: String,
var email: String? = null
) : Form<DataReportForm>()
enum class DataReportDataType(val entityDTOType: KType) {
OpsUser(typeOf<UserDTO>())
}
我在 OpsNotification.kt 檔案定義了 Ops 子專案所有的 NotificationType。每個 NotificationType 都是子類別 OpsNotificationType 物件。DataReport 的 lazyLoadBlock
lambda 會根據 QueryDSL 查詢得到 List 物件,再匯出成 Excel 檔案。
object OpsNotification {
val DataReport = OpsNotificationType("dataReport") { notification ->
val form = notification.lazyLoadArg as DataReportForm
requireNotNull(form.email)
notification.recipients.add(Recipient(form.email!!, email = form.email))
val dtoClass = form.dataType.entityDTOType.classifier as KClass<EntityDTO<*>>
val dtoList = transaction {
DynamicQuery.from(form.query).toDBQuery(dtoClass).toList(dtoClass)
}
val columnIds = ReportDataUtils.getColumnIds(dtoClass)
val table = Table(form.dataType.name, columnIds)
dtoList.forEach { table.addRow(ReportDataUtils.toMap(it, columnIds)) }
val report = ReportData(id, name, mutableListOf(table))
val queryTime = Instant.now()
val args = mapOf(
"dataType" to form.dataType.name,
"queryTime" to DateTimeUtils.TAIWAN_DATE_TIME_FORMATTER.format(queryTime),
"query" to form.query
)
notification.templateArgs.putAll(args)
val fileName = "${this.name}_${args["dataType"]}_${args["queryTime"]}"
val attachment = report.toExcelAttachment(fileName)
notification.content.email[lang!!] = EmailContent(attachments = listOf(attachment))
}
private val notificationType = typeOf<OpsNotificationType>()
val AllTypes = OpsNotification::class.memberProperties
.filter { it.returnType == notificationType }
.map { it.getter.call(this) as OpsNotificationType }
}
class OpsNotificationType(
name: String,
@Transient @OpenApiIgnore private val lazyLoadBlock: (NotificationType.(Notification) -> Unit)? = null
) : NotificationType(
OpsConst.projectId, name, setOf(NotificationChannel.Email),
NotificationCategory.System, null, Lang.SystemDefault, lazyLoadBlock
)
要填入 dataType
, queryTime
, query
3個 templateArgs 給 FreeMarker 替換 email template 變數
ops_dataReport.Email.subject=[維運] 資料查詢報表: ${dataType} ${queryTime}
<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
</head>
<body>
<div id="content">
<ul>
<li>
查詢時間: ${queryTime}
</li>
<li>
查詢資料類型: ${dataType}
</li>
<li>
查詢條件: ${query}
</li>
</ul>
</div>
</body>
</html>